package com.bigfans.framework.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

import java.util.Properties;

public class KafkaFactory {

    private Properties producerProperties;
    private Properties consumerProperties;
    private volatile Producer producer;

    public KafkaFactory() {
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                KafkaFactory.this.closeProducer();
            }
        });
    }

    public KafkaFactory(Properties producerProperties, Properties consumerProperties) {
        this.producerProperties = producerProperties;
        this.consumerProperties = consumerProperties;
    }

    public void setConsumerProperties(Properties consumerProperties) {
        this.consumerProperties = consumerProperties;
    }

    public void setProducerProperties(Properties producerProperties) {
        this.producerProperties = producerProperties;
    }

    public Properties getProducerProperties() {
        return producerProperties;
    }

    public Properties getConsumerProperties() {
        return consumerProperties;
    }

    public KafkaConsumer createConsumer(String clientId) {
        Properties properties = this.getConsumerProperties();
        properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        KafkaConsumer consumer = new KafkaConsumer(properties);
        return consumer;
    }

    public KafkaConsumer createConsumer() {
        Properties properties = this.getConsumerProperties();
        KafkaConsumer consumer = new KafkaConsumer(properties);
        return consumer;
    }

    public Producer getProducer() {
        if(producer == null){
            synchronized (this) {
                if(producer == null){
                    producer = new KafkaProducer(getProducerProperties());
                }
            }
        }
        return producer;
    }

    public void closeProducer(){
        if(producer != null){
            synchronized (this) {
                if(producer != null){
                    producer.close();
                }
            }
        }
    }
}
